home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
common
/
emailaccount.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
20KB
|
538 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
from __future__ import with_statement
from common import AccountBase, profile, netcall, pref, UpdateMixin, FromNetMixin
from util.observe import ObservableList
from common.actions import action
from common.notifications import fire
from urllib import quote
from util.net import UrlQuery
from traceback import print_exc
import os
import shlex
import locale
from subprocess import Popen
from os.path import expandvars
from logging import getLogger
log = getLogger('emailaccount')
info = log.info
from util import urlprotocol, try_this, get_func_name, call_later, callsback
from path import path
from prefs import localprefprop
import util
class EmailAccount(AccountBase, UpdateMixin, FromNetMixin):
retry_time = 3
error_max = 3
def __init__(self, enabled = True, updateNow = True, **options):
AccountBase.__init__(self, **options)
UpdateMixin.__init__(self, **options)
FromNetMixin.__init__(self, **options)
self.emails = ObservableList()
self.count = 0
self.seen = set()
self._dirty_error = True
log.info('Created EmailAccount: %r. Setting enabled to %r', self, enabled)
self.enabled = enabled
def timestamp_is_time(self, tstamp):
return True
def mailclient_localprefs_key(acct):
return '/'.join([
acct.protocol,
acct.username,
'mailclient'])
mailclient = localprefprop(mailclient_localprefs_key, None)
email_address = util.iproperty('get_email_address', 'set_email_address')
def get_email_address(self):
EmailAddress = EmailAddress
import util
try:
return str(EmailAddress(self.name, self.default_domain)).decode('ascii')
except AttributeError:
try:
ret = None if '@' in self.name else self.name + '@' + self.default_domain
return str(ret).decode('ascii')
except Exception:
return self.name.decode('ascii', 'replace')
except:
None<EXCEPTION MATCH>Exception
None<EXCEPTION MATCH>Exception
def set_email_address(self, val):
pass
def display_name(self):
return (try_this,)((lambda : getattr(self, pref('email.display_attr'))), self.email_address)
display_name = property(display_name)
def on_error(self, task = None):
self.error_count += 1
log.error("%r's error count is now: %d", self, self.error_count)
log.error('on_error called from %s', get_func_name(2))
del self.emails[:]
def bad_pw(self):
log.info('%r: changing state to BAD_PASSWORD', self)
self.set_offline(self.Reasons.BAD_PASSWORD)
self.timer.stop()
def no_mailbox(self):
log.info('%r: changing state to NO_MAILBOX', self)
self.set_offline(self.Reasons.NO_MAILBOX)
self.timer.stop()
def __repr__(self):
r = AccountBase.__repr__(self)[:-1]
r += ', '
None += r if self.enabled else 'disabled'
return r + '>'
def web_login(self):
if pref('privacy.www_auto_signin'):
pass
return self.state in (self.Statuses.ONLINE, self.Statuses.CHECKING)
web_login = property(web_login)
def error_link(self):
reason = self.Reasons
linkref = {
reason.BAD_PASSWORD: (('Edit Account',), (lambda : profile.account_manager.edit(self, True))),
reason.CONN_FAIL: (('Retry',), (lambda : self.update_now())) }
if self.offline_reason in linkref:
(name, callback) = linkref[self.offline_reason]
return (name, callback)
else:
return None
def sort_emails(self, new = None):
self.emails.sort()
if new is not None:
new.sort()
def _received_emails(self, emails, inboxCount = None):
self.emails[:] = list(emails)
new = []
for email in self.emails:
if email.id not in self.seen:
new.append(email)
self.seen.add(email.id)
continue
self.sort_emails(new)
info('%s - %s: %d new emails', self.__class__.__name__, self.name, len(new))
if inboxCount is not None:
self._setInboxCount(inboxCount)
self.new = new
if new:
profile.when_active(self.fire_notification)
self.error_count = 0
self.change_state(self.Statuses.ONLINE)
self._dirty_error = True
def fire_notification(self):
if self.new:
self._notify_emails(self.new)
def _notify_emails(self, emails, always_show = None, allow_click = True):
if self.enabled:
None(fire, email.new = 'emails', emails = 'onclick' if allow_click else None, always_show = always_show, icon = self.icon)
def _setInboxCount(self, inboxCount):
self.setnotifyif('count', inboxCount)
def OnComposeEmail(self, to = '', subject = '', body = '', cc = '', bcc = '', callback = None):
for name in ('to', 'subject', 'body', 'cc', 'bcc'):
pass
if self.mailclient and (try_this,)((lambda : self.mailclient.startswith('file:')), False):
os.startfile(self.mailclient[5:])
elif self.mailclient == 'sysdefault':
kw = { }
for name in ('subject', 'body', 'cc', 'bcc'):
if vars()[name]:
kw[name] = vars()[name]
continue
query = UrlQuery('mailto:' + quote(to), **kw)
log.info('OnComposeEmail is launching query: %s' % query)
os.startfile(query)
else:
url = self.compose(to, subject, body, cc, bcc)
if url:
import wx as wx
wx.LaunchDefaultBrowser(url)
callback.success()
OnComposeEmail = action()(callsback(OnComposeEmail))
def client_name(self):
mc = self.mailclient
if mc in (None, True, False):
return self.protocol_info().name
elif mc.startswith('file:'):
return path(mc).basename().title()
elif mc == 'sysdefault':
return ''
else:
log.warning('unknown mailclient attribute in %r: %s', self, mc)
return _('Email Client')
client_name = property(client_name)
def OnClickInboxURL(self, e = None):
if self.mailclient:
url = self.start_client_email()
if url is None:
return None
else:
url = self.inbox_url
import wx
wx.LaunchDefaultBrowser(self.inbox_url)
OnClickInboxURL = action()(OnClickInboxURL)
OnClickHomeURL = OnClickInboxURL
def OnClickEmail(self, email):
if self.mailclient:
self.start_client_email(email)
else:
import wx
wx.LaunchDefaultBrowser(self.urlForEmail(email))
cname = self.__class__.__name__
if self.web_login and cname not in ('IMAPMail', 'PopMail'):
self._remove_email(email)
def OnClickSend(self, to = '', subject = '', body = '', cc = '', bcc = '', callback = None):
getattr(self, 'send_email', self.OnComposeEmail)(to = to, subject = subject, body = body, cc = cc, bcc = bcc, callback = callback)
OnClickSend = callsback(OnClickSend)
def start_client_email(self, email = None):
log.info('mailclient: %s', self.mailclient)
import os.path as os
if self.mailclient == 'sysdefault':
launch_sysdefault_email(email)
elif (try_this,)((lambda : self.mailclient.startswith('file:')), False):
filename = self.mailclient[5:]
if os.path.exists(filename):
os.startfile(filename)
else:
log.warning('cannot find %s', filename)
def __len__(self):
return self.count
def __iter__(self):
return iter(self.emails)
can_has_preview = False
def icon(self):
skin = skin
import gui
try_this = try_this
import util
return (None, try_this)((lambda : skin.get('serviceicons.%s' % self.protocol)), None)
icon = property(icon)
def inbox_url(self):
raise NotImplementedError
inbox_url = property(inbox_url)
def observe_count(self, callback):
self.add_gui_observer(callback, 'count')
self.emails.add_gui_observer(callback)
def unobserve_count(self, callback):
self.remove_gui_observer(callback, 'count')
self.emails.remove_gui_observer(callback)
def observe_state(self, callback):
self.add_gui_observer(callback, 'enabled')
self.add_gui_observer(callback, 'state')
def unobserve_state(self, callback):
self.remove_gui_observer(callback, 'enabled')
self.remove_gui_observer(callback, 'state')
def header_funcs(self):
return [
('Inbox', self.OnClickInboxURL),
('Compose', self.OnComposeEmail)]
header_funcs = property(header_funcs)
def _get_options(self):
opts = UpdateMixin.get_options(self)
return opts
def update_info(self, **info):
flush_state = False
self.frozen().__enter__()
try:
for k, v in info.iteritems():
if k in ('password', 'username', 'server') and getattr(self, k, None) != v:
flush_state = True
self.setnotifyif(k, v)
finally:
pass
profile.update_account(self)
if flush_state:
log.info('Resetting state for %r', self)
self._reset_state()
self._dirty_error = True
def _reset_state(self):
return NotImplemented
def update(self):
if self.update == EmailAccount.update:
log.warning('not implemented: %s.update', self.__class__.__name__)
raise NotImplementedError
if not self.enabled:
return None
log.info('%s (%s) -- preparing for update. update called from: %s', self, self.state, get_func_name(2))
if self.state == self.Statuses.OFFLINE:
self.change_state(self.Statuses.CONNECTING)
elif self.state == self.Statuses.ONLINE:
self.change_state(self.Statuses.CHECKING)
elif self.state == self.Statuses.CONNECTING:
if not self.error_count:
log.error('%s -- called update while connecting, and no errors! disconnecting...', self)
self.set_offline(self.Reasons.CONN_FAIL)
else:
log.error('Unexpected state for update: %r', self.state)
def update_now(self):
netcall(self.update)
self.timer.reset(self.updatefreq)
update_now = action((lambda self: self.state != self.Statuses.CHECKING))(update_now)
def tell_me_again(self):
if self.emails:
emails = self.emails
allow_click = True
else:
Email = Email
import mail.emailobj
emails = [
Email(id = -1, fromname = _('No unread mail'))]
allow_click = False
self._notify_emails(emails, always_show = [
'Popup'], allow_click = allow_click)
tell_me_again = action()(tell_me_again)
def auth(self):
netcall(self.authenticate)
auth = action()(auth)
def Connect(self):
self.change_reason(self.Reasons.NONE)
call_later(1.5, self.update)
def compose(self, to, subject, body, cc, bcc):
raise NotImplementedError
compose = action()(compose)
def urlForEmail(self, email):
raise NotImplementedError
def open(self, email_message):
if type(self) is EmailAccount:
raise NotImplementedError
open = action()(open)
def _remove_email(self, email_message):
try:
self.emails.remove(email_message)
except ValueError:
pass
self.setnotifyif('count', self.count - 1)
def markAsRead(self, email_message):
self._remove_email(email_message)
markAsRead = action()(markAsRead)
def delete(self, email_message):
self._remove_email(email_message)
delete = action()(delete)
def archive(self, email_message):
self._remove_email(email_message)
archive = action()(archive)
def reportSpam(self, email_message):
self._remove_email(email_message)
reportSpam = action()(reportSpam)
import re
env_re = re.compile('(%.*?%)')
def mailclient_error():
fire('error', title = _('No System Email Client'), msg = _('No system email client is configured.'), details = '')
def mailclient_launch_error(msg):
fire('error', title = _('Error launching system mail client'), msg = msg, details = '')
def envexpand(s):
parts = env_re.split(s)
out = []
for part in parts:
if part.startswith('%') and part.endswith('%'):
part = '${' + part[1:-1] + '}'
out.append(part)
return expandvars(''.join(out))
def launch_sysdefault_email(email = None):
import wx
if 'wxMSW' in wx.PlatformInfo:
is_vista = is_vista
import gui.native.win.winutil
if is_vista():
return launch_sysdefault_email_vista(email)
mailclient = urlprotocol.get('mailto')
log.info('mail client is %r', mailclient)
if not mailclient.strip():
mailclient_error()
return False
mailclient = envexpand(mailclient)
try:
args = shlex.split(mailclient.encode(locale.getpreferredencoding()))
args = args[:1]
log.info('launching %r', args)
if Popen(args):
return True
except Exception:
e = None
print_exc()
msg = e.message
try:
msg = _('Could not start system mail client %r.') % mailclient
except Exception:
msg = _('Could not start system mail client.')
print_exc()
mailclient_launch_error(msg)
def launch_sysdefault_email_vista(email = None):
import _winreg as reg
try:
key = reg.OpenKey(reg.HKEY_CURRENT_USER, 'Software\\Clients\\mail', 0, reg.KEY_READ)
(client_name, t) = reg.QueryValueEx(key, None)
log.info('canonical name for mail client: %r', client_name)
mc_key_name = 'SOFTWARE\\Clients\\Mail\\%s\\shell\\open\\command'
mc_key = reg.OpenKey(reg.HKEY_LOCAL_MACHINE, mc_key_name % client_name, 0, reg.KEY_READ)
(program, t) = reg.QueryValueEx(mc_key, None)
log.info('mail client shell launch string: %r', program)
except Exception:
e = None
print_exc()
return mailclient_error()
if isinstance(program, unicode):
import sys as sys
program = program.encode(sys.getfilesystemencoding())
try:
program = envexpand(program)
import subprocess as subprocess
import shlex
args = shlex.split(program)
log.info('launching vista mail client: %r', args)
subprocess.Popen(args)
return True
except Exception:
e = None
print_exc()
mailclient_launch_error(_('Could not start system mail client.'))
if __name__ == '__main__':
launch_sysdefault_email_vista()